使用 select 函数实现更为精确的延时

select 初识

1
2
3
4
5
6
7
8
9
10
11
12
int select(
_In_ int nfds, // 忽略,仅是为了与Berkeley套接字兼容
_Inout_ fd_set *readfds, // 指向一个套接字集合,用来检查其可读性
_Inout_ fd_set *writefds, // 指向一个套接字集合,用来检查其可写性
_Inout_ fd_set *exceptfds, // 指向一个套接字集合,用来检查错误
_In_ const struct timeval *timeout // 指定此函数等待的最长时间,如果为NULL,则最长时间为无限大
);

返回值:
函数调用成功,返回发生网络事件的所有套接字数量的总和
如果超时返回0,代表在描述词状态改变前已超过timeout时间;
当有错误发生时则返回SOCKET_ERROR(-1).
  • select 包含三个 Socket 队列,分别代表: readfds,检查可读性,writefds,检查可写性,exceptfds,例外数据。
  • timeout 是 select 函数的返回时间。timeout 参数控制 select 完成的时间。若 timeout 参数为空指针,则 select 将一直阻塞到有一个描述字满足条件,否则的话,timeout 指向一个 timeval 结构,其中指定了 select 调用在返回前等待多长时间。如果 timeval 为 {0,0},则 select 立即返回,这可用于探询所选套接口的状态,如果处于这种状态,则 select 调用可认为是非阻塞的,且一切适用于非阻塞调用的假设都适用于它。

Windows 下有关 select 函数使用的 APIs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <WinSock2.h>

struct timeval {
long tv_sec; // 秒
long tv_usec; // 微秒
};

typedef struct fd_set {
u_int fd_count; // 下面数组的大小
SOCKET fd_array[FD_SETSIZE]; // 套接字句柄数组
} fd_set;

FD_ZERO(*fd_set); // 初始化 fd_set 为空集合。集合在使用前应该总是清空
FD_CLR(fd, *fd_set); // 从 fd_set 移除套接字 fd
FD_ISSET(fd, *fd_set); // 检查 fd 是不是 fd_set 的成员,如果是返回 TRUE
FD_SET(fd, *fd_set); // 添加套接字 fd 到集合 fd_set
/**************************************************************************/

 例如,我们想要检查一个套接字是否有数据需要(可以)接收,我们可以把套接字句柄加入可读性检查队列中,然后调用 select,如果,该套接字没有数据需要(可以)接收, select 函数会把该套接字从可读性检查队列中删除掉,所以我们只要检查该套接字句柄是否还存在于可读性队列中,就可以知道到底有没有数据需要(可以)接收了。

When Readable? When Writable?

下面是摘自 Microsoft 官网的 API[select function] 说明

  • The parameter readfds identifies the sockets that are to be checked for readability.

 If the socket is currently in the listen state, it will be marked as readable if an incoming connection request has been received such that an accept is guaranteed to complete without blocking.
 假如已调用了 listen,而且一个连接正在建立,那么 accept 函数调用会成功,accept 之前,用于监听的 socket 是可读的
 For other sockets, readability means that queued data is available for reading such that a call to recv, WSARecv, WSARecvFrom, or recvfrom is guaranteed not to block.
 对于其他不是用于监听的 Socket,有数据可以读入,则是可读的

  • The parameter writefds identifies the sockets that are to be checked for writability.
     If a socket is processing a connect call (nonblocking), a socket is writeable if the connection establishment successfully completes.
     如果已完成了对一个非阻塞 connect 调用的处理,连接能够成功就是可写
     If the socket is not processing a connect call, writability means a send, sendto, or WSASendto are guaranteed to succeed.
     对于其他不是调用 connect 的 Socket,可以发出数据就是可写的

 相信你看完官网的说明之后会对 什么 Socket 可读?什么 Socket 可写 更加捉摸不透(即使有注解)。下面是个人关于这方面的理解,其实所谓可读可写,可以借用 Linux 一切皆文件 的思想,任何一个 Socket,其实就是一个文件句柄,当连接建立起来,显然,关于连接两端的 Socket,都是可写的;收发两端均没有数据发送时,关于连接两端的 Socket,都是不可读的;当有一端有数据发送,另一端对应的 Socket 则是可读的。我们借用之前的 三次握手 建立连接流程图作进一步解释。


 通过设置非阻塞属性,可以实现 connectaccept 非阻塞调用。上图的阻塞阶段可以理解为无需阻塞了,而变成类似下面的代码流程通过 select 函数检测连接状态变化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int       rwCount;
fd_set rSSet, wSSet;
SOCKET listenSoc, connectSoc;

// listenSoc: bind, listen

// connectSoc: connect

rwCount = 0;
while (rwCount < 2){
// listen socket 可读,connect socket 可写 说明建立连接了
rdSSet.fd_count = 1;
rdSSet.fd_array[0] = listenSoc;
wrSSet.fd_count = 1;
wrSSet.fd_array[0] = connectSoc;
rwCount = select(0, &rdSSet, &wrSSet, NULL, &tv);
}

// listenSoc: accept

 在非阻塞的状态下,Server 依次通过 bindlisten 创建队列,监听客户端的连接。Client 通过 connect 与 Server 建立连接,并获得该连接对应 Socket 句柄;Server 端的监听队列则保留了与该连接对应的 Socket,通过 accept 获取该连接对应的 Socket 句柄。在此之后,Client,Server 两端便可通过与建立的连接对应的 Socket 实现通讯。

总结:

  • 通过 select 模式,实现了监听 socket 的 accept 和客户端的 read 之间,以及各个客户端之间的 read,可以不用一直阻塞在那,而是在有相应事件的时候再进行阻塞处理,把 accept 和 read 两个长阻塞转化为 select 一个长阻塞。
  • 使用 select 的好处是程序能够在单个线程内同时处理多个套接字连接,这避免了阻塞模式下的线程膨胀问题。

Using Select for Delay

 从上面的描述中我们可以看到,当 select 检测的 socket 状态没有发生改变时,假如设置了 timeout 时间,则会一直阻塞直到 timeout 时间到。利用 select 实现 delay 正是利用这样的原理。不难想象,我们需要先创建一些 socket,然后让用于实现 delay 的 select 检测 socket 处于某种状态(不可读或者不可写),然后通过 select 去检测该不可达状态,最终 select 超时来实现准确的 delay。
 下面通过我们项目中一个实际应用 Velodyne_player 作进一步说明。

  • Velodyne_player 是一个 .pcap 文件回播程序,实现的功能是,32线 Velodyne-HDL-32E 激光雷达采集的数据是 UDP 的数据包,通过 .pcap 文件保存;在日后的线下算法调试中通过 Velodyne_player 进行回播,解析采集的 .pcap 文件,然后通过 UDP 的方式发送,模拟 Velodyne-HDL-32E 激光雷达数据采集过程。
  • .pcap 文件中每一个数据包均带有时间戳,数据按帧(激光雷达一周扫描的数据为一帧)播放,所以可以获取每一帧的起始时间戳,进而获取每一帧的时间间隔。Velodyne_player 中我们通过计算当前帧与起始帧的时间间隔,然后利用系统时间,将其转化为系统绝对时间。接下来,问题就便成同步当前机器时间与播放帧系统绝对时间:
      1). 当前帧绝对时间滞后于机器时间,将当前帧数据立即发送
      2). 当前帧绝对时间超前于机器时间,通过 select 延迟超前的时间,再将当前帧数据发送出去

 1. 首先,需要创建用于 select 检测的,我们可以判断所处状态的 socket。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// include needed MFC library hearder files
// ...

#include <WinSock2.h> // for mostly socket operation
#include <glib.h> // for g_get_current_time

#define ARBITRARY_START_PORT 11000

static LONG LastSocket = ARBITRARY_START_PORT;

/**
* Debug function: message box for print tempory data
*/
void printFunc(const char* info, int status){
CString tmp = " = ";
CString str;
str.Format("%d", status);
MessageBox(NULL, info + tmp + str, "", NULL);
}

/*
* 静态函数会被自动分配在一个一直使用的存储区,直到退出应用程序实例,避免了调用函数时压栈出栈,速度快很多
*/
static int lcm_internal_pipe_create(int filedes[2]){
int status, SocOpt, rwCount, nPort;
short Port;
// readability&writability check socket set
fd_set rdSSet, wrSSet;
SOCKET listenSoc, acceptSoc, connectSoc;
sockaddr_in listen_addr, connect_addr;
timeval tv;
/*
* listenSoc: 服务器端的socket
* connectSoc: 客户端的socket
*/
listenSoc = socket(PF_INET, SOCK_STREAM, 0);
connectSoc = socket(PF_INET, SOCK_STREAM, 0);

listen_addr.sin_family = AF_INET;
listen_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
/*
* This loop ensures that we pick up an UNUSED port. If anything else used this port,
* the entire lcm notification system melts down. The assumption is that we can't bind
* to an address in use once the SO_EXCLUSIVEADDRUSE has been set. If this isn't true,
* another method will need to be implemented.
*/
do{
nPort = InterlockedIncrement(&LastSocket); // Make sure we're using unique port
if (nPort > 65500){ // Wrapping, reset the port #
InterlockedCompareExchange(&LastSocket, ARBITRARY_START_PORT, nPort);
}
Port = (short)nPort;
listen_addr.sin_port = htons(Port);

SocOpt = 1;
status = setsockopt(listenSoc, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *)&SocOpt, sizeof(SocOpt));
if (status)
continue;
status = bind(listenSoc, (LPSOCKADDR)&listen_addr, sizeof(listen_addr));
} while (status != 0);
/*
* 1. 非阻塞方式连接
* 设置非阻塞方式连接(http://blog.csdn.net/ludw508/article/details/8565203)
* 对于阻塞的套接字:0表示正确,SOCKET_ERROR表示错误
* 对于非阻塞套接字,不能立刻判断连接是否完成。函数会返回SOCKET_ERROR,但这并不表示出错
* 2. 请求队列只能允许一个客户端请求
*/
SocOpt = 1;
status = ioctlsocket(listenSoc, FIONBIO, (u_long *)&SocOpt);
status = listen(listenSoc, 1);

connect_addr.sin_family = AF_INET;
connect_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
connect_addr.sin_port = htons(Port);

SocOpt = 1;
status = ioctlsocket(connectSoc, FIONBIO, (u_long *)&SocOpt);
if (SOCKET_ERROR == status){
return -1;
}
status = connect(connectSoc, (LPSOCKADDR)&connect_addr, sizeof(connect_addr));

/**
* 使用 select 函数来决定连接请求的完成,通过查看套接字是否可写
*/
rwCount = 0;
tv.tv_sec = 0;
tv.tv_usec = 20 * 1000;
while (rwCount < 2){
rdSSet.fd_count = 1;
rdSSet.fd_array[0] = connectSoc;
wrSSet.fd_count = 1;
wrSSet.fd_array[0] = listenSoc;
rwCount = select(0, &wrSSet, &rdSSet, NULL, &tv);
#ifdef __TEST__
printFunc("rwCount =", rwCount);
#endif
}
// Both sockets are ready now to complete the connection.

int addr_len = sizeof(listen_addr);
acceptSoc = accept(listenSoc, (LPSOCKADDR)&listen_addr, &addr_len);

// Restore the sockets to blocking (default behavior).
SocOpt = 0;
status = ioctlsocket(connectSoc, FIONBIO, (u_long *)&SocOpt);
status = ioctlsocket(acceptSoc, FIONBIO, (u_long *)&SocOpt);

filedes[0] = (int)connectSoc;
filedes[1] = (int)acceptSoc;

return 0;
}

分析:

  • 结合前一节关于可读性,可写性的解释,结合上面具体的代码,调用 lcm_internal_pipe_create 获得 connectaccept 返回的 socket 句柄,两个 socket 是通过 listen socket 建立的连接的两端,连接成功建立,二者都是可写的;连接中没有任何一方发送数据,故二者都是不可读的。
  • 此外,用于建立连接的 listen socket,因为他的 listen 队列大小为 1,所以,最终该 socket 是不可写的;因为调用 accept 后,监听队列被取空,所以该监听 socket 也是不可读的。
  • lcm_internal_pipe_create 必须是静态函数,这样该函数的所有变量会被自动分配在一个一直使用的存储区,直到退出应用程序实例,保证 socket 句柄一直存在,一直有效。

 2. 开启 gthread 线程按帧为单位获取激光雷达数据包,并以 UDP 的方式发送出去。为了更准确的模拟激光雷达帧数据的播放时间,通过 select 实现恰当的延时,保证帧与帧之间播放的时间间隔,每一帧播放的时间与实际情况相符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/*
* Glib time control functions
* struct timeval: glib Structure used in select() call
*/
static inline int64_t timevalToInt64(struct timeval tv){
return (int64_t)tv.tv_sec * 1000000 + tv.tv_usec;
}
static int64_t timestamp_now(void){
GTimeVal tv;
g_get_current_time(&tv);
return (int64_t)tv.tv_sec * 1000000 + tv.tv_usec;
}

static int lcm_internal_pipe_close(int fd){
return closesocket((SOCKET)fd);
}

/*
* gthread线程函数
* 1. 通过NextEvent从.pcap文件获取下一帧激光数据
* 2. 通过select的准确timeout,同步播放时刻与抓包时间戳
* 3. 通过PublishEvent通过UDP发送出去,提供给Velodyne_viewer使用
*/
static void* timer_thread_func(void* user){
VeloPlayer* velo_player = reinterpret_cast<VeloPlayer*> (user);

int64_t abstime;
int64_t now;
int64_t starttime;
int64_t startabstime;
struct timeval sleep_tv;

int64_t sleep_utime;
fd_set fds;
int timer_pipe[2];
TRACE("timer_thread_func \n");
if (lcm_internal_pipe_create(timer_pipe) != 0){
TRACE("lcm_internal_pipe_create failed\n");
return NULL;
}

int status = 0;

startabstime = timestamp_now();

velo_player->NextEvent();
starttime = timevalToInt64(velo_player->m_currentTime);

while (velo_player->m_exitThread == 0){
velo_player->NextEvent();
abstime = timevalToInt64(velo_player->m_currentTime) - starttime + startabstime;
if (abstime < 0)
return NULL;

now = timestamp_now();
if (abstime >now){
sleep_utime = abstime - now;
sleep_tv.tv_sec = sleep_utime / 1000000;
sleep_tv.tv_usec = sleep_utime % 1000000;

// sleep until the next timed message, or until an abort message
/*
* select: 2rd parameter readfds identifies the sockets that are to be checked for readability
*/
FD_ZERO(&fds);
FD_SET(timer_pipe[0], &fds);
/*
* 测试可读性 可写性
* velo_player->timer_pipe[0/1]: connect socket/accept socket
* 已完成一个非阻塞connect的连接(可写)
* 没有数据可以接收(不可读 阻塞到超时,超时时间sleep_tv)
*/
//#define _SELECT_TEST_
//#define _SELECT_WRITE_TEST_
#ifdef _SELECT_WRITE_TEST_
status = select(0, 0, &fds, NULL, &sleep_tv);
#else
status = select(0, &fds, 0, NULL, &sleep_tv);
#endif
#ifdef _SELECT_TEST_
int error = WSAGetLastError();
printFunc("status =", status);
printFunc("error =", error);
#endif
// select timed out
if (!status){
velo_player->PublishEvent();
}
else{
TRACE("0 != status %d %d\n", status, WSAGetLastError());
break;
}
}
else{
if (abstime < now)
TRACE("abstime < now %ld\n", now - abstime);
velo_player->PublishEvent();
}
}
TRACE("Thread stoped! \n");
if (timer_pipe[0] >= 0)
lcm_internal_pipe_close(timer_pipe[0]);
if (timer_pipe[1] >= 0)
lcm_internal_pipe_close(timer_pipe[1]);

return NULL;
}

References:

文章目录
  1. 1. select 初识
  2. 2. When Readable? When Writable?
  3. 3. Using Select for Delay